#!/usr/bin/env ruby

require 'rubygems'
require 'json/ext'
require 'redis'
require 'restclient'
require 'optparse'
require 'timeout'
require 'pp'

# rubocop:disable Loop, MethodLength

options = {}

optparse = OptionParser.new do|opts|
  opts.banner = 'Usage: sabisu_uploader [options]'

  opts.on('-c', '--config FILE', 'configuration file (json format)') do |json|
    options[:config] = json
  end

  opts.on('-h', '--help', 'Display this screen') do
    puts opts
    exit
  end
end
optparse.parse!

if options[:config].nil?
  puts 'Must give configuration file (-c)'
  exit 1
else
  config = JSON.parse(File.read(options[:config]), symbolize_names: true)

  config[:redis][:server] = 'localhost' unless config[:redis].key?(:server)
  config[:redis][:port] = 6379          unless config[:redis].key?(:port)
  config[:redis][:db] = 0               unless config[:redis].key?(:db)
  config[:redis][:list] = 'events'      unless config[:redis].key?(:list)

  config[:cloudant][:dbs][:current] = 'sensu_current' unless config[:cloudant][:dbs] &&
                                                             config[:cloudant][:dbs].key?(:current)
  config[:cloudant][:dbs][:history] = 'sensu_history' unless config[:cloudant][:dbs] &&
                                                             config[:cloudant][:dbs].key?(:history)

  config[:batch_max_events] = 20 unless config.key?(:batch_max_events) ||
                                        config[:batch_max_events] == 0
  config[:batch_max_time] = 5    unless config.key?(:batch_max_time) ||
                                        config[:batch_max_time] == 0
end

# cloudant class to send sensu events to cloudant dbs in bulk
# https://wiki.apache.org/couchdb/HTTP_Bulk_Document_API
class Cloudant
  def initialize(username, cred_user, cred_password, current_db, history_db)
    @username = username
    @credentials = { username: cred_user, password: cred_password }
    @current_db = make_url(current_db)
    @history_db = make_url(history_db)
  end

  def make_url(db)
    "https://#{@credentials[:username]}:#{@credentials[:password]}@" \
    "#{@username}.cloudant.com/#{db}"
  end

  def request(db, path, method = :get, data = {})
    JSON.parse(
      RestClient.send(
        method,
        "#{db}/#{path}",
        data.to_json,
        content_type: :json,
        accept: :json
      ),
      symbolize_names: true
    )
  end

  # get the last known event
  def last_events(ids)
    request(@current_db, '_all_docs?include_docs=true', :post, keys: ids)[:rows].map do |doc|
      if doc.key?(:error)
        nil
      elsif doc.key?(:doc)
        doc[:doc]
      end
    end.compact
  rescue RestClient::ResourceNotFound
    nil
  end

  def send_history(events)
    request(@history_db, '_bulk_docs', :post, docs: events)
  end

  def send_current(events)
    request(@current_db, '_bulk_docs', :post, docs: events)
  end
end

def format_check(event)
  "#{event[:client][:name]}/#{event[:check][:name]}"
end

def process_events(events, cloudant)
  starttime = Time.now
  current_updates, history_updates = [], []
  last_events = cloudant.last_events(events.map { |e| format_check(e) })

  events.each do |event|
    id = format_check(event)
    prefix = "  #{id} | "

    # find last known event for this event
    last_event = last_events.find do |e|
      e[:event][:client][:name] == event[:client][:name] &&
      e[:event][:check][:name] == event[:check][:name]
    end

    # did the current event occur BEFORE the last one?
    # this can occur if the sending of some events to the cloudant dbs failed,
    # and the events are now arriving out-of-order.
    out_of_order = last_event &&
                   last_event[:event][:check][:issued].to_i >= event[:check][:issued].to_i

    if event[:check][:status] == 0
      # check has resolved, delete it from current db.
      # but ignore if it's out-of-order or the check doesn't exist for some reason
      if last_event && !out_of_order
        current_updates << { _id: id, _rev: last_event[:_rev], _deleted: true }
        puts prefix + 'Recovery (deleted current doc)'
      end

      # add to history
      history_updates << { event: event }

    elsif last_event.nil?
      # event doesn't exist yet, create it
      # NOTE: there's a possible consistency issue here, in case an alert comes
      #       in that has a timestamp BEFORE the last resolve. in that case, we
      #       should simply ignore it, but we don't have any info on when it was
      #       resolved. this case should be pretty rare. if it does happen, the
      #       current db can be cleaned up by calling the following API endpoint
      #       in sabisu:
      #         /api/events/stale?clear_recovered=true
      event[:check][:state_change] = event[:check][:issued]
      current_updates << { _id: id, event: event }
      history_updates << { event: event }
      puts prefix + 'Created (first occurrence)'
    else
      # check if state has changed
      state_changed = last_event[:event][:check][:status] != event[:check][:status]
      if state_changed
        event[:check][:state_change] = event[:check][:issued]
      else
        event[:check][:state_change] = last_event[:event][:check][:state_change]
      end

      # check if output has changed
      output_changed = last_event[:event][:check][:output] != event[:check][:output]

      # only update the current state if the event is newer than what was last written
      unless out_of_order
        if state_changed
          current_updates << { _id: id, _rev: last_event[:_rev], event: event }
          history_updates << { event: event }
          puts prefix + 'Updated (state change)'
        elsif output_changed
          # only update current db with only an output change
          current_updates << { _id: id, _rev: last_event[:_rev], event: event }
          puts prefix + 'Updated (output change)'
        end
      end
    end
  end

  current_results = cloudant.send_current(current_updates)
  history_results = cloudant.send_history(history_updates)

  puts "Finished #{events.length} events in #{((Time.now - starttime) * 1000).round(3)} ms"
  events.each { |e| puts "  #{format_check(e)}" }
  { current: current_results, history: history_results }
end

def put_back(event, config, redis)
  unless event.nil?
    redis.rpush(config[:redis][:list], event.to_json)
    puts "  #{format_check(event)} | Put event back on redis queue"
  end
rescue StandardError => e
  puts "ERROR: Unable to add the event (#{format_check(event)}) back " \
       "on the redis queue. It will be lost forever.\n" +
       e.message + "\n" + e.backtrace.join("\n") + "\n"
end

quit = false
redis = Redis.new(
  host: config[:redis][:server],
  port: config[:redis][:port],
  db: config[:redis][:db]
)
cloudant = Cloudant.new(
  config[:cloudant][:username],
  config[:cloudant][:auth][:username],
  config[:cloudant][:auth][:password],
  config[:cloudant][:dbs][:current],
  config[:cloudant][:dbs][:history]
)

until quit
  # pop an event off the redis queue (blocking)
  events = [] # init events in case redis.rpop fails first go
  begin
    Timeout.timeout(config[:batch_max_time]) do
      start_time = Time.now
      until events.length >= config[:batch_max_events]
        tmp_event = redis.rpop(config[:redis][:list])
        events << JSON.parse(tmp_event, symbolize_names: true) if tmp_event
      end
      puts "We have #{events.length} events, collected in " \
           "#{format('%.1f', (Time.now - start_time))} seconds, let's process them."
    end
  rescue Timeout::Error
    unless events.empty?
      puts "It's been #{config[:batch_max_time]} seconds, time to " \
           "process #{events.length} events."
    end
  rescue StandardError => err
    puts "ERROR while retrieving event from redis.\n" +
         err.message + "\n" + err.backtrace.join("\n") + "\n"
  end

  # dedupe events by only keeping the first (sorted by issued time),
  # and pushing the rest back on the redis queue.
  existing_ids = []
  events = events.sort_by { |e| e[:check][:issued] }.map do |e|
    check_name = format_check(e)
    if existing_ids.include?(check_name)
      put_back(e, config, redis)
      nil
    else
      existing_ids << check_name
      e
    end
  end.compact

  # process the events
  begin
    unless events.empty?
      results = process_events(events, cloudant)
      results[:current].each do |r|
        unless r.key?(:rev)
          event = events.find { |e| format_check(e) == r[:id] }
          puts 'ERROR: Failed to update the Cloudant databases. ' \
               "Pushing this event back on the head of the redis queue.\n" +
               r.to_json + "\n"
          put_back(event, config, redis)
        end
      end
    end
  rescue StandardError => err2
    # someting went wrong, assume none of the events got uploaded.
    # this should be ok, because if we upload the same event twice,
    # the second one will be dropped due to the same timestamp.
    puts 'ERROR: Failed to update the Cloudant databases. ' \
         "Pushing all events back on the head of the redis queue.\n" +
         err2.message + "\n" + err2.backtrace.join("\n") + "\n"
    events.each { |e| put_back(e, config, redis) }
  end
end
